// Copyright Epic Games, Inc. All Rights Reserved.

#include "jupiterremoteprojectstore.h"

#include <zencore/compress.h>
#include <zencore/fmtutils.h>

#include <upstream/jupiter.h>
#include <zenhttp/auth/authmgr.h>

namespace zen {

using namespace std::literals;

class JupiterRemoteStore : public RemoteProjectStore
{
public:
	JupiterRemoteStore(Ref<CloudCacheClient>&&		CloudClient,
					   std::string_view				Namespace,
					   std::string_view				Bucket,
					   const IoHash&				Key,
					   const IoHash&				OptionalBaseKey,
					   bool							ForceDisableBlocks,
					   bool							ForceDisableTempBlocks,
					   const std::filesystem::path& TempFilePath)
	: m_CloudClient(std::move(CloudClient))
	, m_Namespace(Namespace)
	, m_Bucket(Bucket)
	, m_Key(Key)
	, m_OptionalBaseKey(OptionalBaseKey)
	, m_TempFilePath(TempFilePath)
	{
		if (ForceDisableBlocks)
		{
			m_EnableBlocks = false;
		}
		if (ForceDisableTempBlocks)
		{
			m_UseTempBlocks = false;
		}
	}

	virtual RemoteStoreInfo GetInfo() const override
	{
		return {.CreateBlocks	   = m_EnableBlocks,
				.UseTempBlockFiles = m_UseTempBlocks,
				.AllowChunking	   = true,
				.ContainerName	   = fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
				.BaseContainerName = m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format("{}/{}/{}", m_Namespace, m_Bucket, m_Key),
				.Description	   = fmt::format("[cloud] {} as {}/{}/{}{}"sv,
											 m_CloudClient->ServiceUrl(),
											 m_Namespace,
											 m_Bucket,
											 m_Key,
											 m_OptionalBaseKey == IoHash::Zero ? "" : fmt::format(" Base {}", m_OptionalBaseKey))};
	}

	virtual Stats GetStats() const override
	{
		return {.m_SentBytes		 = m_SentBytes.load(),
				.m_ReceivedBytes	 = m_ReceivedBytes.load(),
				.m_RequestTimeNS	 = m_RequestTimeNS.load(),
				.m_RequestCount		 = m_RequestCount.load(),
				.m_PeakSentBytes	 = m_PeakSentBytes.load(),
				.m_PeakReceivedBytes = m_PeakReceivedBytes.load(),
				.m_PeakBytesPerSec	 = m_PeakBytesPerSec.load()};
	}

	virtual SaveResult SaveContainer(const IoBuffer& Payload) override
	{
		CloudCacheSession Session(m_CloudClient.Get());
		PutRefResult	  PutResult = Session.PutRef(m_Namespace, m_Bucket, m_Key, Payload, ZenContentType::kCbObject);
		AddStats(PutResult);

		SaveResult Result{ConvertResult(PutResult), {PutResult.Needs.begin(), PutResult.Needs.end()}, PutResult.RawHash};
		if (Result.ErrorCode)
		{
			Result.Reason = fmt::format("Failed saving oplog container to {}/{}/{}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										m_Bucket,
										m_Key,
										Result.Reason);
		}
		return Result;
	}

	virtual SaveAttachmentResult SaveAttachment(const CompositeBuffer& Payload, const IoHash& RawHash) override
	{
		CloudCacheSession Session(m_CloudClient.Get());
		CloudCacheResult  PutResult = Session.PutCompressedBlob(m_Namespace, RawHash, Payload);
		AddStats(PutResult);

		SaveAttachmentResult Result{ConvertResult(PutResult)};
		if (Result.ErrorCode)
		{
			Result.Reason = fmt::format("Failed saving oplog attachment to {}/{}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										RawHash,
										Result.Reason);
		}
		return Result;
	}

	virtual SaveAttachmentsResult SaveAttachments(const std::vector<SharedBuffer>& Chunks) override
	{
		SaveAttachmentsResult Result;
		for (const SharedBuffer& Chunk : Chunks)
		{
			CompressedBuffer	 Compressed	 = CompressedBuffer::FromCompressedNoValidate(Chunk.AsIoBuffer());
			SaveAttachmentResult ChunkResult = SaveAttachment(Compressed.GetCompressed(), Compressed.DecodeRawHash());
			if (ChunkResult.ErrorCode)
			{
				return SaveAttachmentsResult{ChunkResult};
			}
		}
		return Result;
	}

	virtual FinalizeResult FinalizeContainer(const IoHash& RawHash) override
	{
		CloudCacheSession Session(m_CloudClient.Get());
		FinalizeRefResult FinalizeRefResult = Session.FinalizeRef(m_Namespace, m_Bucket, m_Key, RawHash);
		AddStats(FinalizeRefResult);

		FinalizeResult Result{ConvertResult(FinalizeRefResult), {FinalizeRefResult.Needs.begin(), FinalizeRefResult.Needs.end()}};
		if (Result.ErrorCode)
		{
			Result.Reason = fmt::format("Failed finalizing oplog container to {}/{}/{}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										m_Bucket,
										m_Key,
										Result.Reason);
		}
		return Result;
	}

	virtual LoadContainerResult LoadContainer() override { return LoadContainer(m_Key); }

	virtual LoadContainerResult LoadBaseContainer() override
	{
		if (m_OptionalBaseKey == IoHash::Zero)
		{
			return LoadContainerResult{{.ErrorCode = static_cast<int>(HttpResponseCode::NoContent)}};
		}
		return LoadContainer(m_OptionalBaseKey);
	}

	virtual HasAttachmentsResult HasAttachments(const std::span<IoHash> RawHashes) override
	{
		CloudCacheSession	   Session(m_CloudClient.Get());
		CloudCacheExistsResult ExistsResult =
			Session.CompressedBlobExists(m_Namespace, std::set<IoHash>(RawHashes.begin(), RawHashes.end()));
		AddStats(ExistsResult);

		HasAttachmentsResult Result{ConvertResult(ExistsResult),
									std::unordered_set<IoHash, IoHash::Hasher>(ExistsResult.Needs.begin(), ExistsResult.Needs.end())};
		if (ExistsResult.ErrorCode)
		{
			Result.Reason = fmt::format("Failed checking attachment existance in {}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										Result.Reason);
		}
		return Result;
	}

	virtual LoadAttachmentResult LoadAttachment(const IoHash& RawHash) override
	{
		CloudCacheSession Session(m_CloudClient.Get());
		CloudCacheResult  GetResult = Session.GetCompressedBlob(m_Namespace, RawHash, m_TempFilePath);
		AddStats(GetResult);

		LoadAttachmentResult Result{ConvertResult(GetResult), std::move(GetResult.Response)};
		if (GetResult.ErrorCode)
		{
			Result.Reason = fmt::format("Failed fetching oplog attachment from {}/{}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										RawHash,
										Result.Reason);
		}
		return Result;
	}

	virtual LoadAttachmentsResult LoadAttachments(const std::vector<IoHash>& RawHashes) override
	{
		LoadAttachmentsResult Result;
		for (const IoHash& Hash : RawHashes)
		{
			LoadAttachmentResult ChunkResult = LoadAttachment(Hash);
			if (ChunkResult.ErrorCode)
			{
				return LoadAttachmentsResult{ChunkResult};
			}
			ZEN_DEBUG("Loaded attachment in {}", NiceTimeSpanMs(static_cast<uint64_t>(ChunkResult.ElapsedSeconds * 1000)));
			Result.Chunks.emplace_back(
				std::pair<IoHash, CompressedBuffer>{Hash, CompressedBuffer::FromCompressedNoValidate(std::move(ChunkResult.Bytes))});
		}
		return Result;
	}

private:
	LoadContainerResult LoadContainer(const IoHash& Key)
	{
		CloudCacheSession Session(m_CloudClient.Get());
		CloudCacheResult  GetResult = Session.GetRef(m_Namespace, m_Bucket, Key, ZenContentType::kCbObject);
		AddStats(GetResult);
		if (GetResult.ErrorCode || !GetResult.Success)
		{
			LoadContainerResult Result{ConvertResult(GetResult)};
			Result.Reason = fmt::format("Failed fetching oplog container from {}/{}/{}/{}. Reason: '{}'",
										m_CloudClient->ServiceUrl(),
										m_Namespace,
										m_Bucket,
										Key,
										Result.Reason);
			return Result;
		}

		CbObject ContainerObject = LoadCompactBinaryObject(GetResult.Response);
		if (!ContainerObject)
		{
			return LoadContainerResult{
				RemoteProjectStore::Result{.ErrorCode	   = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError),
										   .ElapsedSeconds = GetResult.ElapsedSeconds,
										   .Reason = fmt::format("The ref {}/{}/{}/{} is not formatted as a compact binary object"sv,
																 m_CloudClient->ServiceUrl(),
																 m_Namespace,
																 m_Bucket,
																 Key)},
				{}};
		}
		return LoadContainerResult{ConvertResult(GetResult), std::move(ContainerObject)};
	}

	void AddStats(const CloudCacheResult& Result)
	{
		m_SentBytes.fetch_add(gsl::narrow<uint64_t>(Result.SentBytes));
		m_ReceivedBytes.fetch_add(gsl::narrow<uint64_t>(Result.ReceivedBytes));
		m_RequestTimeNS.fetch_add(static_cast<uint64_t>(Result.ElapsedSeconds * 1000000000));
		SetAtomicMax(m_PeakSentBytes, Result.SentBytes);
		SetAtomicMax(m_PeakReceivedBytes, Result.ReceivedBytes);
		if (Result.ElapsedSeconds > 0.0)
		{
			uint64_t BytesPerSec = static_cast<uint64_t>((Result.SentBytes + Result.ReceivedBytes) / Result.ElapsedSeconds);
			SetAtomicMax(m_PeakBytesPerSec, BytesPerSec);
		}

		m_RequestCount.fetch_add(1);
	}

	static Result ConvertResult(const CloudCacheResult& Response)
	{
		std::string Text;
		int32_t		ErrorCode = 0;
		if (Response.ErrorCode != 0 || !Response.Success)
		{
			if (Response.Response)
			{
				HttpContentType ContentType = Response.Response.GetContentType();
				if (ContentType == ZenContentType::kText || ContentType == ZenContentType::kJSON)
				{
					ExtendableStringBuilder<256> SB;
					SB.Append("\n");
					SB.Append(std::string_view(reinterpret_cast<const std::string::value_type*>(Response.Response.GetData()),
											   Response.Response.GetSize()));
					Text = SB.ToString();
				}
				else if (ContentType == ZenContentType::kCbObject)
				{
					ExtendableStringBuilder<256> SB;
					SB.Append("\n");
					CompactBinaryToJson(Response.Response.GetView(), SB);
					Text = SB.ToString();
				}
			}
		}
		if (Response.ErrorCode != 0)
		{
			ErrorCode = Response.ErrorCode;
		}
		else if (!Response.Success)
		{
			ErrorCode = gsl::narrow<int32_t>(HttpResponseCode::InternalServerError);
		}
		return {.ErrorCode = ErrorCode, .ElapsedSeconds = Response.ElapsedSeconds, .Reason = Response.Reason, .Text = Text};
	}

	Ref<CloudCacheClient> m_CloudClient;
	const std::string	  m_Namespace;
	const std::string	  m_Bucket;
	const IoHash		  m_Key;
	const IoHash		  m_OptionalBaseKey;
	std::filesystem::path m_TempFilePath;
	bool				  m_EnableBlocks  = true;
	bool				  m_UseTempBlocks = true;

	std::atomic_uint64_t m_SentBytes;
	std::atomic_uint64_t m_ReceivedBytes;
	std::atomic_uint64_t m_RequestTimeNS;
	std::atomic_uint64_t m_RequestCount;
	std::atomic_uint64_t m_PeakSentBytes;
	std::atomic_uint64_t m_PeakReceivedBytes;
	std::atomic_uint64_t m_PeakBytesPerSec;
};

std::shared_ptr<RemoteProjectStore>
CreateJupiterRemoteStore(const JupiterRemoteStoreOptions& Options, const std::filesystem::path& TempFilePath)
{
	std::string Url = Options.Url;
	if (Url.find("://"sv) == std::string::npos)
	{
		// Assume https URL
		Url = fmt::format("https://{}"sv, Url);
	}
	CloudCacheClientOptions ClientOptions{.Name			  = "Remote store"sv,
										  .ServiceUrl	  = Url,
										  .ConnectTimeout = std::chrono::milliseconds(2000),
										  .Timeout		  = std::chrono::milliseconds(1800000),
										  .AssumeHttp2	  = Options.AssumeHttp2,
										  .AllowResume	  = true,
										  .RetryCount	  = 4};
	// 1) Access token as parameter in request
	// 2) Environment variable (different win vs linux/mac)
	// 3) openid-provider (assumes oidctoken.exe -Zen true has been run with matching Options.OpenIdProvider

	std::unique_ptr<CloudCacheTokenProvider> TokenProvider;
	if (!Options.AccessToken.empty())
	{
		TokenProvider = CloudCacheTokenProvider::CreateFromCallback([AccessToken = "Bearer " + Options.AccessToken]() {
			return CloudCacheAccessToken{.Value = AccessToken, .ExpireTime = GcClock::TimePoint::max()};
		});
	}
	else
	{
		TokenProvider =
			CloudCacheTokenProvider::CreateFromCallback([&AuthManager = Options.AuthManager, OpenIdProvider = Options.OpenIdProvider]() {
				AuthMgr::OpenIdAccessToken Token = AuthManager.GetOpenIdAccessToken(OpenIdProvider.empty() ? "Default" : OpenIdProvider);
				return CloudCacheAccessToken{.Value = Token.AccessToken, .ExpireTime = Token.ExpireTime};
			});
	}

	Ref<CloudCacheClient> CloudClient(new CloudCacheClient(ClientOptions, std::move(TokenProvider)));

	std::shared_ptr<RemoteProjectStore> RemoteStore = std::make_shared<JupiterRemoteStore>(std::move(CloudClient),
																						   Options.Namespace,
																						   Options.Bucket,
																						   Options.Key,
																						   Options.OptionalBaseKey,
																						   Options.ForceDisableBlocks,
																						   Options.ForceDisableTempBlocks,
																						   TempFilePath);
	return RemoteStore;
}

}  // namespace zen
